package gu.simplemq;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import gu.simplemq.exceptions.SmqUnsubscribeException;

/**
 * {@link IMessageAdapter}实例装饰类<br>
 * 使用单线程的线程池来执行订阅消息处理,确保所有的命令按收到的顺序执行
 * @author guyadong
 *
 */
public class IMessageAdapterDecorator<T> implements IMessageAdapter<T> {
	private IMessageAdapter<T> adapter;
	/** 固定为1的线程池，确保所有的命令按收到的顺序执行 */
	private static final Executor executor = MoreExecutors.getExitingExecutorService(
			new ThreadPoolExecutor(1, 1,
	                0L, TimeUnit.MILLISECONDS,
	                new LinkedBlockingQueue<Runnable>(),
	                new ThreadFactoryBuilder().setNameFormat("message-adapter-%d").build()));
	public IMessageAdapterDecorator() {
		this(null);
	}

	public IMessageAdapterDecorator(IMessageAdapter<T> adapter) {
		setAdapter(adapter);
	}

	public IMessageAdapter<T> getAdapter() {
		return adapter;
	}

	public IMessageAdapterDecorator<T> setAdapter(IMessageAdapter<T> adapter) {
		this.adapter = adapter;
		return this;
	}

	@Override
	public void onSubscribe(final T t) throws SmqUnsubscribeException {
		if(adapter != null){
			// 消息处理交给executor对象执行
			getExecutor().execute(new Runnable(){
				@Override
				public void run() {
					try{
						adapter.onSubscribe(t);
					} catch (Throwable e) {
						e.printStackTrace();
					}
				}});
		}		
	}

	protected Executor getExecutor() {
		return executor;
	}
	public static <T> IMessageAdapterDecorator<T> makeInstance(IMessageAdapter<T> adapter){
		if(adapter instanceof IMessageAdapterDecorator){
			return (IMessageAdapterDecorator<T>)adapter;
		}
		return new IMessageAdapterDecorator<T>(adapter);
	}
}
